Distributed Python code

In this second tutorial we are looking into distributing Python code on High Performance Computing (HPC) clusters. These clusters are assemblages of computer nodes, with multiple CPU and cores within that are coordinated by a main scheduler which distributes tasks and organizes resources. The cluster we are going to connect to is located at Simon Fraser University and named Cedar, after BC's official tree Western Red Cedar. The resources of this cluster, like many others, are handled by the Simple Linux Utility for Resource Management (SLURM) job scheduler. Therefore, we need to comply to its specifications when submitting jobs to this cluster.

For more information on how to connect to Cedar, check the Alliance Website here.

In this tutorial, we are going to:

  1. Run parallel Python code on a single node
  2. Run distributed Python code on multiple nodes

Both these applications are related to CPU operations, while we will only briefly touch on distributed GPU tasks at the end of the tutorial. Let's get started!

Single node vs Multi-node

Single node calls are the easiest, because they don't require internode communication. Therefore, many Python libraries and modules for task parallelization that are designed for local machine usage, can also be used on single nodes (e.g. multiprocessing, joblib). Therefore, they may not be suited to dispatch tasks on multiple cluster nodes.

For multi-node CPU tasks, dask is a great Python package for scalable workloads, whether the requested resources fit within a single node or require multiple nodes. It is designed to work with big dataframes and arrays in a lazy fashion, and can be easily implemented to parallelize your Python code. Note, however, that relying on too much parallelized code doesn't necessarily equate to greater performance and faster speed. Parallelization comes with additional overhead, which is worth taking when the tasks can actually benefit from it.

Setting dask to communicate with the cluster is as easy as calling a single function! To do that, we need to also install dask_jobqueue, which is the recommended package to communicate with HPC clusters (many are supported). The dask API is vast and offers many options to parallelize your code and submit tasks to a cluster. Take your time to familiarize with it.

In this example, we are working with a sizeable array of 2.5 billion cells that we don't want to or can not keep in memory, and we'll do some basic operations on it for demonstration. dask has a nice numpy-like interface, so it is pretty faimilar to work with. Importantly, because the computations are done lazily, similarly to polars, we only obtain a result when we actively ask for it. That is, we dump all our expensive computations on the cluster and we collect a single result at the end on our login node.

# ssh into your login node
# ...

# from within the login node, prepare the script
import dask.array as da
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

# initializa an array of random floats and chunk it for blockwise-operations
array = da.random.random((50_000, 50_000), chunks=(2000, 2000))

# do some computation
def add(x):
    return x + 1

# open connection with cluster in context manager to automatically close
# the connection upon completion
# the SLURM configuration is saved on your local disk, so if you modify
# it there, you don't need to request the resouces again in this call
with SLURMCluster(
    account="def-accountname",
    cores=10,
    memory="10 GB"
) as cluster, Client(cluster) as client:
    # request 1 worker
    cluster.scale(1)
    # map the function to each chunk of the array
    array1 = array.map_blocks(add)
    array1.compute()
    # apply another function to `array` using the dask.array API
    array2 = array.sum()
    result = array2.compute()

In other cases, when calling functions like map or submit, dask returns a future, that is a promise that a computation will be carried out. In this case, we have to wait until our job passes the queue (pending state) to be able to see the future status as finished.

One other thing you may have noticed, is that we can't really monitor how the computation is going, how much memory or CPU it's using, or the amount of RAM being consumed. To solve this issue, dask offers a great dashboard to monitor the execution on the cluster. You need to establish a ssh tunnel to the cluster from your local workstation, so once we call SLURMcluster (or any other jobqueue interface), we can start monitoring the workload distribution and progress. Ultimately, it would look something like the following.

Workers being deployed

Cluster status

In case we required more computation power, we can further scale our cluster by requesting more workers by using the scale method in cluster. This would also increase our wait time.

GPU nodes

Dask offers an interface to deal with GPU applications on clusters. This is done via the dask-cuda package. However, at this stage, GPU support seems to be limited to Linux-based systems. Alternatively, other libraries including PyTorch offer API to communicate with clusters for distributed workloads. In these cases, we can write our Python script that contains the model we want to run. Next, we need to provide SLURM with an executable bash script were we specify the type of resources we want to allocate for the workload.

For more information on how to work with GPU-based tasks, join our next workshop on March 24th!